上一篇文章中,我們已經學習到 A2A 的東西,接下來我們就要來改造一下我們的 AI 工具人。
30-18: [知識] 可以讓 AI 工具人知道外面世界的工具 3 - A2A ( Agent2Agent )
讓他變成 :
一個 A2A Server 服務,可以讓其它 Agent 來溝通,也可以去連其它 A2A Server
下面這個是我的 A2A Server 的部份,其中有幾個地方可以注意一下 :
import { v4 as uuidv4 } from "uuid";
import type { AgentCard } from "@a2a-js/sdk";
import {
AgentExecutor,
RequestContext,
ExecutionEventBus,
DefaultRequestHandler,
InMemoryTaskStore,
} from "@a2a-js/sdk/server";
import { A2AExpressApp } from "@a2a-js/sdk/server/express";
import { ChatWorkflow } from "../workflows/chat.workflow";
let sessionMap: Map<string, ChatWorkflow> = new Map();
const helloAgentCard: AgentCard = {
name: "Hello Agent",
description: "A simple agent that says hello.",
protocolVersion: "0.3.0",
version: "0.1.0",
url: "http://localhost:4000/",
skills: [
{ id: "chat", name: "Chat", description: "Say hello", tags: ["chat"] },
],
capabilities: {},
defaultInputModes: [],
defaultOutputModes: [],
};
class ChatExecutor implements AgentExecutor {
async execute(
requestContext: RequestContext,
eventBus: ExecutionEventBus
): Promise<void> {
console.log("requestContext", requestContext);
let chatWorkflow = sessionMap.get(requestContext.contextId);
if (!chatWorkflow) {
chatWorkflow = new ChatWorkflow();
await chatWorkflow.initialize(requestContext.contextId);
sessionMap.set(requestContext.contextId, chatWorkflow);
}
const message = requestContext.userMessage.parts[0]!.text;
for await (const chunk of chatWorkflow.processMessage(message)) {
eventBus.publish({
kind: "message",
messageId: uuidv4(),
role: "agent",
parts: [{ kind: "text", text: chunk }],
contextId: requestContext.contextId,
});
}
eventBus.finished();
}
cancelTask = async (): Promise<void> => {};
}
const agentExecutor = new ChatExecutor();
const requestHandler = new DefaultRequestHandler(
helloAgentCard,
new InMemoryTaskStore(),
agentExecutor
);
const appBuilder = new A2AExpressApp(requestHandler);
export const a2aExpressAppBuilder = appBuilder;
🤔 註冊 A2A Server 的地方
我這裡是先同時放在同一個 server 內,只是不同 port,至於需不需要拆開,我自已是覺得還好。
import express from "express";
import cors from "cors";
import dotenv from "dotenv";
import chatRoutes from "./routes/chat.js";
import { connectDatabase } from "./infrastructure/mongodb/database.js";
import { a2aExpressAppBuilder } from "./a2a/index.js";
const a2aServer = a2aExpressAppBuilder.setupRoutes(express());
const httpServer = express();
dotenv.config();
httpServer.use(cors());
httpServer.use(express.json());
httpServer.use("/api", chatRoutes);
httpServer.get("/health", (req, res) => {
res.json({ status: "OK", timestamp: new Date().toISOString() });
});
httpServer.listen(3000, async () => {
console.log(`🚀 Http Server is running on http://localhost:3000`);
await connectDatabase();
});
a2aServer.listen(4000, async () => {
console.log(`🚀 A2A Server started on http://localhost:4000`);
});
🤔 備註 EventBus
這個裡面有用到 eventBus 相關的東西,有興趣的可參考這篇之前我寫的文章來理解一下它。
Day-23: Domain Event 之 Transactional OutBox 與 EventBus
事實上沒啥重點,就和官網的差不多。
import { A2AClient } from "@a2a-js/sdk/client";
import {
Message,
MessageSendParams,
SendMessageSuccessResponse,
} from "@a2a-js/sdk";
import { v4 as uuidv4 } from "uuid";
async function run() {
const client = await A2AClient.fromCardUrl(
"http://localhost:4000/.well-known/agent-card.json"
);
const sendParams: MessageSendParams = {
message: {
messageId: uuidv4(),
contextId: "test-context-id",
role: "user",
parts: [{ kind: "text", text: "你好我是馬克大人" }],
kind: "message",
},
};
const response = await client.sendMessage(sendParams);
if ("error" in response) {
console.error("Error:", response.error.message);
} else {
const result = (response as SendMessageSuccessResponse).result as Message;
console.log("Agent response:", result.parts);
}
}
(async () => {
await run();
})();
下面就是我執行 a2a client 後的結果,可以看到,這個 client 的確有和我們的工具人進行溝通。
假設我們這個 AI 工具人,要去連我們家 ( Hahow ) 自已開發的 A2A Server,例如叫 HahowA2A,使用上概念會長的如下圖中,以現在這個 LangGraph 的架構下,要加入也算很簡單,大概只要做到如下程式碼的修改就好。
以下為我們實際上的程式碼,然後這個地方會透過 A2AClient 來取連線到 Hahow A2A Server。
import { A2AClient } from "@a2a-js/sdk/client";
import {
MessageSendParams,
SendMessageSuccessResponse,
Message,
} from "@a2a-js/sdk";
import { v4 as uuidv4 } from "uuid";
export class HahowA2AAgent {
private a2aClient: A2AClient;
constructor(a2aClient: A2AClient) {
this.a2aClient = a2aClient;
}
async callLLM(message: string): Promise<string> {
const sendParams: MessageSendParams = {
message: {
messageId: uuidv4(),
contextId: "test-context-id",
role: "user",
parts: [{ kind: "text", text: message }],
kind: "message",
},
};
const response = await this.a2aClient.sendMessage(sendParams);
const result = (response as SendMessageSuccessResponse).result as Message;
return result.parts[0].text;
}
}
然後在 Workflow 主要就是進行兩件事 :
我這裡就只貼重點就好,不然現在程式碼已經有點長了。
public async initialize(threadId: string) {
...
const a2aClient = await A2AClient.fromCardUrl(
"http://localhost:4001/.well-known/agent-card.json"
);
this.hahowA2AAgent = new HahowA2AAgent(a2aClient);
this.graph = this.buildGraph();
this.currentState = await this.getCurrentState();
}
private buildGraph() {
const workflow = new StateGraph(ChatStateAnnotation)
.addNode(Steps.INITIAL, async (state: ChatState): Promise<ChatState> => {
...
})
.addNode(Steps.ROUTE_AI, async (state: ChatState): Promise<ChatState> => {
...
})
.addNode(
Steps.LEARNING_AI,
async (state: ChatState): Promise<ChatState> => {
...
}
)
.addNode(
Steps.SUMMARY_AI,
async (state: ChatState): Promise<ChatState> => {
...
}
)
.addNode(
Steps.BACKGROUND_AI,
async (state: ChatState): Promise<ChatState> => {
....
}
)
.addNode(
Steps.HAHOW_A2A_AI,
async (state: ChatState): Promise<ChatState> => {
const response = await this.testE2eAgent!.callLLM(state.query);
return {
messages: [new AIMessage(response)],
query: state.query,
intent: state.intent,
background: state.background,
step: Steps.HAHOW_A2A_AI,
};
}
)
.addEdge(START, Steps.INITIAL)
.addEdge(Steps.INITIAL, Steps.ROUTE_AI)
.addConditionalEdges(Steps.ROUTE_AI, (state: ChatState) => {
if (!state.intent) {
return END;
}
if (state.intent === Intent.SUMMARY) {
return Steps.SUMMARY_AI;
}
if (state.intent === Intent.HAHOW) {
return Steps.HAHOW_A2A_AI;
}
return Steps.BACKGROUND_AI;
})
.addEdge(Steps.SUMMARY_AI, END)
.addConditionalEdges(Steps.BACKGROUND_AI, (state: ChatState) => {
return Steps.LEARNING_AI;
if (state.background) {
return Steps.LEARNING_AI;
}
return END;
})
.addEdge(Steps.LEARNING_AI, END);
if (!this.checkpointSaver) {
throw new Error("Checkpoint saver is not initialized");
}
return workflow.compile({
checkpointer: this.checkpointSaver,
});
}
這篇文章中我們已經讓我們的 AI 工具人變成了一個可以讓其它 Agent 來連的 A2A Server,並且也可以連到其它的 A2A Server 來增加我們這個工具人的功能。
不過先說好,這個基本上是最簡單的版本,如果還要進行到 Agent 與 Agent 的更多互動,可能就還需要花時調整了。
提外話,寫到這篇後真的發現用 LangGraph 來開發真的輕鬆不少,要加個 A2A 也很簡單,不過它真很多東西都還沒更新上 1.0.0 啊……